package main

import (
	"bufio"
	"deduplicate/config"
	"deduplicate/util"
	"fmt"
	"io"
	"os"
	"path/filepath"
	"runtime"
	"strconv"
	"strings"
	"sync"
	"sync/atomic"
	"time"
)

var (
	readLines               = 0                   //读取data文件中的数量 必须和writeLine相等
	writeTmpLines     int64 = 0                   //写入tmp文件中的数量 必须和readLine相等
	writeResultLines  int64 = 0                   //写入没有重复的行数
	mergeThreadCount  int32 = 0                   //当前运行的去重协程个数
	tmpDedupDoneCount int32 = 0                   //已完成去重的文件个数
	doneCh                  = make(chan struct{}) //写入RESULT结束标志chan
	splitDoneCh             = make(chan struct{}) //tmp文件分割chan
	tmpDir                  = filepath.Dir(config.Conf.DataFilePath) + "/" + "temp"
	confDataFilePath        = config.Conf.DataFilePath
	confResultFilePath = config.Conf.ResultFilePath
)

//1.将文件拆分为n个小文件
//2.拆分方式为为每行字符串取hash，hash%n的方式平均分配到不同的文件中
//3.每个小文文件单独去重
//4.合并每个小文件
func main() {
	getwd, err := os.Getwd()
	if len(os.Args) > 2 {
		if err != nil {
			util.Log(err)
		}
		config.Conf.DataFilePath = getwd + "/" + os.Args[1]
		config.Conf.ResultFilePath = getwd + "/" + os.Args[2]
	}
	//默认使用当前路径下的data.txt文件
	config.Conf.DataFilePath = getwd + "/" + "data.txt"
	config.Conf.ResultFilePath = getwd + "/" + "data_dedup.txt"
	tmpDir = filepath.Dir(config.Conf.DataFilePath) + "/" + "temp"

	_ = os.Remove(config.Conf.ResultFilePath)
	_, err = os.Stat(config.Conf.DataFilePath)
	if err != nil {
		if os.IsNotExist(err) {//当前目录下没有data.txt则读取配置文件中的路径
			config.Conf.DataFilePath = confDataFilePath
			tmpDir = filepath.Dir(config.Conf.DataFilePath) + "/" + "temp"
			config.Conf.ResultFilePath = confResultFilePath
		}
	}
	fmt.Println("data :file ", config.Conf.DataFilePath)
	fmt.Println("result :file ", config.Conf.ResultFilePath)
	runtime.GOMAXPROCS(runtime.NumCPU())
	//删除tmp目录
	err = os.RemoveAll(tmpDir)
	if err != nil {
		util.Log(err)
	}
	err = os.MkdirAll(tmpDir, 0777)
	if err != nil {
		util.Log(err)
	}
	startSplit := time.Now()
	var tmpFiles []*os.File
	if config.Conf.SplitMode == 1 {
		tmpFiles = split2TempFileWithCh(config.Conf.DataFilePath, config.Conf.SplitCount)
	} else if config.Conf.SplitMode == 0 {
		tmpFiles = split2TempFileWithRoutine(config.Conf.DataFilePath, config.Conf.SplitCount)
	}
	since := time.Since(startSplit)
	fmt.Println("split done:", since)
	startDedup := time.Now()
	if config.Conf.MergeMode == 0 {
		deduplicateMerge0(tmpFiles, config.Conf.ResultFilePath)
	} else if config.Conf.MergeMode == 1 {
		deduplicateMerge1(tmpFiles, config.Conf.ResultFilePath)
	}
	fmt.Println("merge done lines[" + strconv.FormatInt(writeResultLines, 10) + "]")
	fmt.Println("duplicated lines [" + strconv.FormatInt(writeTmpLines-writeResultLines, 10) + "]")
	fmt.Println("split time:", since)
	fmt.Println("dedup time:", time.Since(startDedup))
	fmt.Println("total time:", time.Since(startSplit))

	//删除tmp目录
	err = os.RemoveAll(tmpDir)
	if err != nil {
		util.Log(err)
	}

}

//deduplicateMerge0 将传入的文件使用map去重并合并
func deduplicateMerge0(fileObjs []*os.File, resultFileName string) []string {
	//每一个temp文件对应一个chan
	wgFlag := &sync.WaitGroup{}
	fileObjsIdex := len(fileObjs)
	deduplicatedLinesChan := make(chan string, config.Conf.MergeChanSize4MergeMode0)
	go merge(deduplicatedLinesChan, resultFileName)
	//每次去重20个文件
	wgFlag.Add(len(fileObjs))
	go func() {
		wgFlag.Wait()
		close(deduplicatedLinesChan)
	}()
	//打印状态
	if config.Conf.StatPrintInterval > 0 {
		go func() {
			fmt.Println("runing dedup thread::", atomic.AddInt32(&mergeThreadCount, 0))
			fmt.Println("cache lines:", len(deduplicatedLinesChan))
			for {
				select {
				case <-time.After(time.Second * time.Duration(config.Conf.StatPrintInterval)):
					fmt.Println("runing dedup thread:", atomic.AddInt32(&mergeThreadCount, 0))
					fmt.Println("cache lines:", len(deduplicatedLinesChan))
					fmt.Printf("all files:%d,done count :%d\n", config.Conf.SplitCount, tmpDedupDoneCount)
				}
			}
		}()
	}

	for {
		if atomic.AddInt32(&mergeThreadCount, 0) < int32(config.Conf.MergeThreadCount) {
			fileObjsIdex--
			if fileObjsIdex < 0 {
				break
			}
			atomic.AddInt32(&mergeThreadCount, 1)
			go func(file *os.File) {
				defer wgFlag.Done()
				defer atomic.AddInt32(&mergeThreadCount, -1)
				_, err := file.Seek(0, 0)
				if err != nil {
					util.Log(err)
				}
				m := make(map[string]*struct{}, 0)
				reader := bufio.NewReaderSize(file, config.Conf.MergeReadWriteBufSize)
				for {
					line, err := reader.ReadString('\n')
					if err != nil {
						if err == io.EOF {
							if strings.TrimSpace(line) != "" {
								m[line] = nil
							}
							break
						}
						util.Log(err)
					}
					m[line] = nil
				}
				//去重完毕，读取map写入协程
				atomic.AddInt32(&tmpDedupDoneCount, 1)
				for dedupLine, _ := range m {
					deduplicatedLinesChan <- dedupLine
				}
				m = nil
			}(fileObjs[fileObjsIdex])
		} else {
			select {
			case <-time.After(10):
			}
		}
	}
	//等待写入文件结束
	for _ = range doneCh {
	}
	return nil
}

//deduplicateMerge1 将传入的文件使用map去重然后重新写入临时文件，使用io.copy合并
func deduplicateMerge1(fileObjs []*os.File, resultFileName string) {
	//shiyong
	depDoneFileObjCh := make(chan *os.File, config.Conf.DoneFileChanSize4MergeMode1)
	wgFlag := &sync.WaitGroup{}
	fileObjsIdex := len(fileObjs)
	//每次去重20个文件
	wgFlag.Add(len(fileObjs))
	//打印状态
	if config.Conf.StatPrintInterval > 0 {
		go func() {
			fmt.Println("runing threads:", atomic.AddInt32(&mergeThreadCount, 0))
			for {
				select {
				case <-time.After(time.Second * time.Duration(config.Conf.StatPrintInterval)):
					fmt.Println("runing threads:", atomic.AddInt32(&mergeThreadCount, 0))
					fmt.Printf("all files:%d,done count :%d\n", config.Conf.SplitCount, tmpDedupDoneCount)
					fmt.Printf("merge file chan cap:%d \n", len(depDoneFileObjCh))
				}
			}
		}()
	}
	//开启一个协程监听文件去重文件是否都写入完成
	go func() {
		wgFlag.Wait()
		close(depDoneFileObjCh)
	}()
	//合并文件协程
	go func() {
		//先删除原有的resultFile
		_ = os.Remove(resultFileName)
		resultFile, err := os.OpenFile(resultFileName, os.O_CREATE|os.O_RDWR, 0777)
		//合并结束标志信号
		defer func() {
			resultFile.Close()
			doneCh <- struct{}{}
			close(doneCh)
		}()
		if err != nil {
			util.Log(err)
		}
		//接受已合并结束的文件指针
		for doneFile := range depDoneFileObjCh {
			if err != nil {
				util.Log(err)
			}
			open, err := os.Open(doneFile.Name())
			if err != nil {
				util.Log(err)
			}
			_, err = open.Seek(0, 0)
			if err != nil {
				util.Log(err)
			}
			//写入
			writer := bufio.NewWriterSize(resultFile, config.Conf.MergeReadWriteBufSize)
			reader := bufio.NewReaderSize(open, config.Conf.MergeReadWriteBufSize)
			_, err = io.Copy(writer, reader)
			if err != nil {
				util.Log(err)
			}
			err = writer.Flush()
			open.Close()
			if err != nil {
				util.Log(err)
			}
		}
	}()
	//控制协程数
	for {
		if atomic.AddInt32(&mergeThreadCount, 0) < int32(config.Conf.MergeThreadCount) {
			fileObjsIdex--
			if fileObjsIdex < 0 {
				break
			}
			//开启一个协程则数量+1
			atomic.AddInt32(&mergeThreadCount, 1)
			go func(file *os.File) {
				defer wgFlag.Done()
				//结束一个协程数量-1
				defer atomic.AddInt32(&mergeThreadCount, -1)
				//把文件读取位置置为文件开头
				_, err := file.Seek(0, 0)
				if err != nil {
					util.Log(err)
				}
				m := make(map[string]*struct{}, 0)
				reader := bufio.NewReaderSize(file, config.Conf.MergeReadWriteBufSize)
				for {
					line, err := reader.ReadString('\n')
					if err != nil {
						if err == io.EOF {
							if strings.TrimSpace(line) != "" {
								m[line] = nil
							}
							break
						}
						util.Log(err)
					}
					m[line] = nil
				}
				//去重完毕，读取map写入临时文件
				err = file.Close()
				if err != nil {
					util.Log(err)
				}
				//删除旧临时文件
				err = os.Remove(file.Name())
				if err != nil {
					util.Log(err)
				}
				//重写生成同名文件
				file2, err := os.OpenFile(file.Name(), os.O_CREATE|os.O_RDWR, 0666)
				if err != nil {
					util.Log(err)
				}
				writer2 := bufio.NewWriterSize(file2, config.Conf.MergeReadWriteBufSize)
				atomic.AddInt64(&writeResultLines, int64(len(m)))
				for dedupLine, _ := range m {
					_, err := writer2.WriteString(dedupLine)
					if err != nil {
						util.Log(err)
					}
				}
				err = writer2.Flush()
				file2.Close()
				atomic.AddInt32(&tmpDedupDoneCount, 1)
				//已完成文件写入chan
				depDoneFileObjCh <- file2
				if err != nil {
					util.Log(err)
				}
			}(fileObjs[fileObjsIdex])
		} else {
			select {
			case <-time.After(100):
			}
		}

	}
	//等待写入文件结束
	for _ = range doneCh {
	}
}

//merge 将传入的文件进行合并成一个文件，合并后的文件名为resultFile
func merge(deduplicatedLinesChan <-chan string, resultFileName string) {
	//将去重完毕的数据写入result文件
	_ = os.Remove(resultFileName)
	resultFile, err := os.OpenFile(resultFileName, os.O_CREATE|os.O_RDWR, 0777)
	if err != nil {
		util.Log(err)
	}
	writer := bufio.NewWriterSize(resultFile, config.Conf.MergeReadWriteBufSize)
	for resultLine := range deduplicatedLinesChan {
		_, err := writer.WriteString(resultLine)
		if err != nil {
			util.Log(err)
		}
		writeResultLines++
	}
	err = writer.Flush()
	if err != nil {
		util.Log(err)
	}
	doneCh <- struct{}{}
	close(doneCh)
	util.Info("merge done lines[" + strconv.FormatInt(writeResultLines, 10) + "]")
	util.Info("duplicated lines [" + strconv.FormatInt(writeTmpLines-writeResultLines, 10) + "]")
}

//inject2TmpFileWithCh 从ch中读取数据写入临时文件
func inject2TmpFileWithCh(tmpWriter []*bufio.Writer, ch <-chan []byte, wg *sync.WaitGroup) {
	defer wg.Done()
	for line := range ch {
		numCh := int(util.Hash1(line)) % config.Conf.SplitCount
		_, err := tmpWriter[numCh].Write(line)
		if err != nil {
			util.Log(err)
		}
		writeTmpLines++
	}
	for _, writer := range tmpWriter {
		err := writer.Flush()
		if err != nil {
			util.Log(err)
		}
	}
}

//split2TempFileWithCh 拆分为n个临时文件
func split2TempFileWithCh(dataFilePath string, n int) []*os.File {
	wg := &sync.WaitGroup{} //控制临时文件写入协程完成数
	tmpFilesWriter := make([]*bufio.Writer, n)
	tmpFiles := make([]*os.File, n) //将临时文件名返回
	tmpFilePrefix := tmpDir + "/tmp"
	file, err := os.Open(dataFilePath)
	defer file.Close()
	if err != nil {
		util.Log(err)
	}

	//使用一个chan存放从data种读入的字符串
	splitContentChan := make(chan []byte, config.Conf.SplitChanSize)
	//打印状态
	if config.Conf.StatPrintInterval > 0 {
		go func() {
			for {
				select {
				case <-time.After(time.Second * time.Duration(config.Conf.StatPrintInterval)):
					fmt.Println("write resultLines :", writeTmpLines)
					fmt.Println("cache lines:", len(splitContentChan))
					//fmt.Printf("all files:%d,done count :%d\n", config.Conf.SplitCount, tmpDedupDoneCount)
				case <-splitDoneCh:
					return
				}
			}
		}()
	}

	//并发从chan中读取字符串
	for i := 0; i < n; i++ {
		tmpFileName := tmpFilePrefix + strconv.Itoa(i) + ".txt"
		tempFile, err := os.OpenFile(tmpFileName, os.O_CREATE|os.O_RDWR, 0777)
		if err != nil {
			util.Log(err)
		}
		tmpFiles[i] = tempFile
		tmpFilesWriter[i] = bufio.NewWriterSize(tempFile, config.Conf.SplitReadWriteBufSize)
	}
	wg.Add(1)
	go inject2TmpFileWithCh(tmpFilesWriter, splitContentChan, wg)
	reader := bufio.NewReaderSize(file, config.Conf.SplitReadWriteBufSize)
	//循环从datafile中读取行写入chan
	for {
		line, err := reader.ReadBytes('\n')
		if err != nil {
			if err == io.EOF {
				if len(line) != 0 {
					splitContentChan <- line
					readLines++
				}
				util.Info("data file read done")
				break
			}
			util.Log(err)
		}
		splitContentChan <- line
		readLines++
	}
	close(splitContentChan)
	//等待合并结束
	wg.Wait()
	//结束打印
	if config.Conf.StatPrintInterval > 0 {
		splitDoneCh <- struct{}{}
		close(splitDoneCh)
	}
	util.Info("read lines :" + strconv.Itoa(readLines))
	util.Info("write lines :" + strconv.FormatInt(writeTmpLines, 10))
	return tmpFiles
}

//split2TempFileWithRoutine 拆分为n个临时文件
func split2TempFileWithRoutine(dataFilePath string, n int) []*os.File {
	wg := &sync.WaitGroup{}         //控制临时文件写入协程完成数
	tmpFiles := make([]*os.File, 0) //将临时文件名返回
	tmpFilePrefix := tmpDir + "/tmp"
	file, err := os.Open(dataFilePath)
	defer file.Close()
	if err != nil {
		util.Log(err)
	}
	//每一个temp文件对应一个chan
	splitContentChans := make([]chan string, n)
	for i, _ := range splitContentChans {
		splitContentChans[i] = make(chan string, config.Conf.SplitChanSize)
	}
	//并发从chan中读取字符串
	for i := 0; i < n; i++ {
		tmpFileName := tmpFilePrefix + strconv.Itoa(i) + ".txt"
		tempFile, err := os.OpenFile(tmpFileName, os.O_CREATE|os.O_RDWR, 0777)
		if err != nil {
			util.Log(err)
		}
		tmpFiles = append(tmpFiles, tempFile)
		wg.Add(1)
		go inject2TmpFileWithRoutine(tempFile, splitContentChans[i], wg)
	}
	reader := bufio.NewReaderSize(file, config.Conf.SplitReadWriteBufSize)
	//循环从datafile中读取行并hash分组到临时文件
	for {
		line, err := reader.ReadString('\n')
		if err != nil {
			if err == io.EOF {
				if strings.TrimSpace(line) != "" {
					numCh := int(util.HashString(line)) % config.Conf.SplitCount
					splitContentChans[numCh] <- line
					readLines++
				}
				util.Info("data file read done")
				break
			}
			util.Log(err)
		}
		numCh := int(util.HashString(line)) % config.Conf.SplitCount
		readLines++
		splitContentChans[numCh] <- line
	}
	for _, ch := range splitContentChans {
		close(ch)
	}
	wg.Wait()
	util.Info("read lines :" + strconv.Itoa(readLines))
	util.Info("write lines :" + strconv.FormatInt(writeTmpLines, 10))
	return tmpFiles
}

//inject2TmpFileWithRoutine每个临时文件对应一个ch，并从ch中读取数据写入临时文件
func inject2TmpFileWithRoutine(tmpFile *os.File, ch <-chan string, wg *sync.WaitGroup) {
	defer wg.Done()
	writer := bufio.NewWriterSize(tmpFile, config.Conf.SplitReadWriteBufSize)
	for line := range ch {
		_, err := writer.WriteString(line)
		atomic.AddInt64(&writeTmpLines, 1)
		if err != nil {
			util.Log(err)
		}
	}
	err := writer.Flush()
	if err != nil {
		util.Log(err)
	}

}
